/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.redis;

import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.util.concurrent.Future;

import org.apache.geode.InternalGemFireError;
import org.apache.geode.LogWriter;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.xmlcache.RegionAttributesCreation;
import org.apache.geode.internal.hll.HyperLogLogPlus;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.ByteToCommandDecoder;
import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.KeyRegistrar;
import org.apache.geode.redis.internal.RedisDataType;
import org.apache.geode.redis.internal.RegionProvider;

/**
 * The GeodeRedisServer is a server that understands the Redis protocol. As commands are sent to the
 * server, each command is picked up by a thread, interpreted and then executed and a response is
 * sent back to the client. The default connection port is 6379 but that can be altered when run
 * through GFSH or started through the provided static main class.
 * <p>
 * Each Redis data type instance is stored in a separate {@link Region} except for the Strings and
 * HyperLogLogs which are collectively stored in one Region respectively. That Region along with a
 * meta data region used internally are protected so the client may not store keys with the name
 * {@link GeodeRedisServer#REDIS_META_DATA_REGION} or {@link GeodeRedisServer#STRING_REGION}. The
 * default Region type is {@link RegionShortcut#PARTITION} although this can be changed by
 * specifying the SystemProperty {@value #DEFAULT_REGION_SYS_PROP_NAME} to a type defined by
 * {@link RegionShortcut}. If the {@link GeodeRedisServer#NUM_THREADS_SYS_PROP_NAME} system property
 * is set to 0, one thread per client will be created. Otherwise a worker thread pool of specified
 * size is used or a default size of 4 * {@link Runtime#availableProcessors()} if the property is
 * not set.
 * <p>
 * Setting the AUTH password requires setting the property "redis-password" just as "redis-port"
 * would be in xml or through GFSH.
 * <p>
 * The supported commands are as follows:
 * <p>
 * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY, GET, GETBIT, GETRANGE,
 * GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX, PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN
 * <p>
 * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM, RPOP,
 * RPUSH, RPUSHX
 * <p>
 * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT, HKEYS, HMGET,
 * HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS
 * <p>
 * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER,
 * SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, SCAN, SUNION, SUNIONSTORE
 * <p>
 * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT, ZRANGE, ZRANGEBYLEX,
 * ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE,
 * ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE
 * <p>
 * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE
 * <p>
 * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL, KEYS,
 * PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL
 * <p>
 * Supported Transaction commands - DISCARD, EXEC, MULTI
 * <P>
 * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT
 * <p>
 * <p>
 * The command executors are not explicitly documented but the functionality can be found at
 * <a href="http://redis.io/commands">Redis Commands</a>
 * <p>
 * Exceptions to the Redis Commands Documents:
 * <p>
 * <ul>
 * <li>Any command that removes keys and returns a count of removed entries will not return a total
 * remove count but rather a count of how many entries have been removed that existed on the local
 * vm, though all entries will be removed</li>
 * <li>Any command that returns a count of newly set members has an unspecified return value. The
 * command will work just as the Redis protocol states but the count will not necessary reflect the
 * number set compared to overridden.</li>
 * <li>Transactions work just as they would on a Redis instance, they are local transaction.
 * Transactions cannot be executed on data that is not local to the executing server, that is on a
 * partitioned region in a different server instance or on a persistent region that does not have
 * transactions enabled. Also, you cannot watch or unwatch keys as all keys within a GemFire
 * transaction are watched by default.</li>
 * </ul>
 *
 */

@Experimental
public class GeodeRedisServer {
  /**
   * Thread used to start main method
   */
  @MakeNotStatic
  private static Thread mainThread = null;

  /**
   * The default Redis port as specified by their protocol, {@code DEFAULT_REDIS_SERVER_PORT}
   */
  public static final int DEFAULT_REDIS_SERVER_PORT = 6379;

  /**
   * The number of threads that will work on handling requests
   */
  private final int numWorkerThreads;

  /**
   * The number of threads that will work socket selectors
   */
  private final int numSelectorThreads;

  /**
   * The actual port being used by the server
   */
  private final int serverPort;

  /**
   * The address to bind to
   */
  private final String bindAddress;

  /**
   * Connection timeout in milliseconds
   */
  private static final int connectTimeoutMillis = 1000;

  /**
   * Temporary constant whether to use old single thread per connection model for worker group
   */
  private boolean singleThreadPerConnection;

  /**
   * Logging level
   */
  private final String logLevel;

  /**
   * The cache instance pointer on this vm
   */
  private Cache cache;

  /**
   * Channel to be closed when shutting down
   */
  private Channel serverChannel;

  /**
   * Gem logwriter
   */
  private LogWriter logger;

  private RegionProvider regionCache;

  private final MetaCacheListener metaListener;

  private EventLoopGroup bossGroup;
  private EventLoopGroup workerGroup;
  private static final int numExpirationThreads = 1;
  private final ScheduledExecutorService expirationExecutor;

  /**
   * Map of futures to be executed for key expirations
   */
  private final ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationFutures;


  /**
   * The field that defines the name of the {@link Region} which holds all of the strings. The
   * current value of this field is {@code STRING_REGION}.
   */
  public static final String STRING_REGION = "ReDiS_StRiNgS";

  /**
   * The field that defines the name of the {@link Region} which holds all of the HyperLogLogs. The
   * current value of this field is {@code HLL_REGION}.
   */
  public static final String HLL_REGION = "ReDiS_HlL";

  /**
   * The field that defines the name of the {@link Region} which holds all of the Redis meta data.
   * The current value of this field is {@code REDIS_META_DATA_REGION}.
   */
  public static final String REDIS_META_DATA_REGION = "ReDiS_MeTa_DaTa";

  /**
   * The system property name used to set the default {@link Region} creation type. The property
   * name is {@code DEFAULT_REGION_SYS_PROP_NAME} and the acceptable values are types defined by
   * {@link RegionShortcut}, i.e. "PARTITION" would be used for {@link RegionShortcut#PARTITION}.
   */
  public static final String DEFAULT_REGION_SYS_PROP_NAME = "gemfireredis.regiontype";

  /**
   * System property name that can be used to set the number of threads to be used by the
   * GeodeRedisServer
   */
  public static final String NUM_THREADS_SYS_PROP_NAME = "gemfireredis.numthreads";

  /**
   * The actual {@link RegionShortcut} type specified by the system property
   * {@value #DEFAULT_REGION_SYS_PROP_NAME}.
   */
  public final RegionShortcut DEFAULT_REGION_TYPE;

  private boolean shutdown;
  private boolean started;
  private KeyRegistrar keyRegistrar;

  /**
   * Determine the {@link RegionShortcut} type from a String value. If the String value doesn't map
   * to a RegionShortcut type then {@link RegionShortcut#PARTITION} will be used by default.
   *
   * @return {@link RegionShortcut}
   */
  private static RegionShortcut setRegionType() {
    String regionType = System.getProperty(DEFAULT_REGION_SYS_PROP_NAME, "PARTITION");
    RegionShortcut type;
    try {
      type = RegionShortcut.valueOf(regionType);
    } catch (Exception e) {
      type = RegionShortcut.PARTITION;
    }
    return type;
  }

  /**
   * Helper method to set the number of worker threads
   *
   * @return If the System property {@value #NUM_THREADS_SYS_PROP_NAME} is set then that number is
   *         used, otherwise 4 * # of cores
   */
  private int setNumWorkerThreads() {
    String prop = System.getProperty(NUM_THREADS_SYS_PROP_NAME);
    int numCores = Runtime.getRuntime().availableProcessors();
    int def = 4 * numCores;
    if (prop == null || prop.isEmpty())
      return def;
    int threads;
    try {
      threads = Integer.parseInt(prop);
    } catch (NumberFormatException e) {
      return def;
    }
    return threads;
  }

  /**
   * Constructor for {@code GeodeRedisServer} that will start the server on the given port and bind
   * to the first non-loopback address
   *
   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by
   *        default
   */
  public GeodeRedisServer(int port) {
    this(null, port, null);
  }

  /**
   * Constructor for {@code GeodeRedisServer} that will start the server and bind to the given
   * address and port
   *
   * @param bindAddress The address to which the server will attempt to bind to
   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by
   *        default if argument is less than or equal to 0
   */
  public GeodeRedisServer(String bindAddress, int port) {
    this(bindAddress, port, null);
  }

  /**
   * Constructor for {@code GeodeRedisServer} that will start the server and bind to the given
   * address and port. Keep in mind that the log level configuration will only be set if a
   * {@link Cache} does not already exist, if one already exists then setting that property will
   * have no effect.
   *
   * @param bindAddress The address to which the server will attempt to bind to
   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by
   *        default if argument is less than or equal to 0
   * @param logLevel The logging level to be used by GemFire
   */
  public GeodeRedisServer(String bindAddress, int port, String logLevel) {
    this.serverPort = port <= 0 ? DEFAULT_REDIS_SERVER_PORT : port;
    this.bindAddress = bindAddress;
    this.logLevel = logLevel;
    this.numWorkerThreads = setNumWorkerThreads();
    this.singleThreadPerConnection = this.numWorkerThreads == 0;
    this.numSelectorThreads = 1;
    this.metaListener = new MetaCacheListener();
    this.expirationFutures = new ConcurrentHashMap<ByteArrayWrapper, ScheduledFuture<?>>();
    this.expirationExecutor =
        Executors.newScheduledThreadPool(numExpirationThreads, new ThreadFactory() {
          private final AtomicInteger counter = new AtomicInteger();

          @Override
          public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("GemFireRedis-ScheduledExecutor-" + counter.incrementAndGet());
            t.setDaemon(true);
            return t;
          }

        });
    this.DEFAULT_REGION_TYPE = setRegionType();
    this.shutdown = false;
    this.started = false;
  }

  /**
   * Helper method to get the host name to bind to
   *
   * @return The InetAddress to bind to
   */
  private InetAddress getBindAddress() throws UnknownHostException {
    return this.bindAddress == null || this.bindAddress.isEmpty() ? LocalHostUtil.getLocalHost()
        : InetAddress.getByName(this.bindAddress);
  }

  /**
   * This is function to call on a {@code GeodeRedisServer} instance to start it running
   */
  public synchronized void start() {
    if (!started) {
      try {
        startGemFire();
        initializeRedis();
        startRedisServer();
      } catch (IOException e) {
        throw new RuntimeException("Could not start Server", e);
      } catch (InterruptedException e) {
        throw new RuntimeException("Could not start Server", e);
      }
      started = true;
    }
  }

  /**
   * Initializes the {@link Cache}, and creates Redis necessities Region and protects declares that
   * {@link Region} to be protected. Also, every {@code GeodeRedisServer} will check for entries
   * already in the meta data Region.
   */
  private void startGemFire() {
    Cache cache = GemFireCacheImpl.getInstance();
    if (cache == null) {
      synchronized (GeodeRedisServer.class) {
        cache = GemFireCacheImpl.getInstance();
        if (cache == null) {
          CacheFactory cacheFactory = new CacheFactory();
          if (logLevel != null)
            cacheFactory.set(LOG_LEVEL, logLevel);
          cache = cacheFactory.create();
        }
      }
    }
    this.cache = cache;
    this.logger = cache.getLogger();
  }

  private void initializeRedis() {
    synchronized (this.cache) {
      Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion;

      Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;
      Region<String, RedisDataType> redisMetaData;
      InternalCache gemFireCache = (InternalCache) cache;
      try {
        if ((stringsRegion = cache.getRegion(STRING_REGION)) == null) {
          RegionFactory<ByteArrayWrapper, ByteArrayWrapper> regionFactory =
              gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE);
          stringsRegion = regionFactory.create(STRING_REGION);
        }
        if ((hLLRegion = cache.getRegion(HLL_REGION)) == null) {
          RegionFactory<ByteArrayWrapper, HyperLogLogPlus> regionFactory =
              gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE);
          hLLRegion = regionFactory.create(HLL_REGION);
        }
        if ((redisMetaData = cache.getRegion(REDIS_META_DATA_REGION)) == null) {
          RegionAttributesCreation regionAttributesCreation = new RegionAttributesCreation();
          regionAttributesCreation.addCacheListener(metaListener);
          regionAttributesCreation.setDataPolicy(DataPolicy.REPLICATE);
          InternalRegionArguments ira =
              new InternalRegionArguments().setInternalRegion(true).setIsUsedForMetaRegion(true);
          redisMetaData =
              gemFireCache.createVMRegion(REDIS_META_DATA_REGION, regionAttributesCreation, ira);
        }
      } catch (IOException | ClassNotFoundException e) {
        // only if loading snapshot, not here
        InternalGemFireError assErr = new InternalGemFireError(
            "unexpected exception");
        assErr.initCause(e);
        throw assErr;
      }
      this.keyRegistrar = new KeyRegistrar(redisMetaData);
      this.regionCache = new RegionProvider(stringsRegion, hLLRegion, this.keyRegistrar,
          expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
      redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
      redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED);
      redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED);
    }
    checkForRegions();
  }

  private void checkForRegions() {
    Collection<Entry<String, RedisDataType>> entrySet = keyRegistrar.keyInfos();
    for (Entry<String, RedisDataType> entry : entrySet) {
      String regionName = entry.getKey();
      RedisDataType type = entry.getValue();
      Region<?, ?> newRegion = cache.getRegion(regionName);
      if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL
          && type != RedisDataType.REDIS_PROTECTED) {
        try {
          this.regionCache
              .createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type);
        } catch (Exception e) {
          if (logger.errorEnabled())
            logger.error(e);
        }
      }
    }
  }

  /**
   * Helper method to start the server listening for connections. The server is bound to the port
   * specified by {@link GeodeRedisServer#serverPort}
   *
   */
  private void startRedisServer() throws IOException, InterruptedException {
    ThreadFactory selectorThreadFactory = new ThreadFactory() {
      private final AtomicInteger counter = new AtomicInteger();

      @Override
      public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("GeodeRedisServer-SelectorThread-" + counter.incrementAndGet());
        t.setDaemon(true);
        return t;
      }
    };

    ThreadFactory workerThreadFactory = new ThreadFactory() {
      private final AtomicInteger counter = new AtomicInteger();

      @Override
      public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("GeodeRedisServer-WorkerThread-" + counter.incrementAndGet());
        return t;
      }
    };

    bossGroup = null;
    workerGroup = null;
    Class<? extends ServerChannel> socketClass = null;
    if (singleThreadPerConnection) {
      bossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, selectorThreadFactory);
      workerGroup = new OioEventLoopGroup(Integer.MAX_VALUE, workerThreadFactory);
      socketClass = OioServerSocketChannel.class;
    } else {
      bossGroup = new NioEventLoopGroup(this.numSelectorThreads, selectorThreadFactory);
      workerGroup = new NioEventLoopGroup(this.numWorkerThreads, workerThreadFactory);
      socketClass = NioServerSocketChannel.class;
    }
    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
    String pwd = system.getConfig().getRedisPassword();
    final byte[] pwdB = Coder.stringToBytes(pwd);
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup).channel(socketClass)
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          public void initChannel(SocketChannel ch) throws Exception {
            if (logger.fineEnabled())
              logger.fine("GeodeRedisServer-Connection established with " + ch.remoteAddress());
            ChannelPipeline p = ch.pipeline();
            p.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
            p.addLast(ExecutionHandlerContext.class.getSimpleName(),
                new ExecutionHandlerContext(ch, cache, regionCache, GeodeRedisServer.this, pwdB,
                    keyRegistrar));
          }
        }).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_RCVBUF, getBufferSize())
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, GeodeRedisServer.connectTimeoutMillis)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

    // Bind and start to accept incoming connections.
    ChannelFuture f = b.bind(new InetSocketAddress(getBindAddress(), serverPort)).sync();
    if (this.logger.infoEnabled()) {
      String logMessage = "GeodeRedisServer started {" + getBindAddress() + ":" + serverPort
          + "}, Selector threads: " + this.numSelectorThreads;
      if (this.singleThreadPerConnection)
        logMessage += ", One worker thread per connection";
      else
        logMessage += ", Worker threads: " + this.numWorkerThreads;
      this.logger.info(logMessage);
    }
    this.serverChannel = f.channel();
  }

  /**
   * Takes an entry event and processes it. If the entry denotes that a
   * {@link RedisDataType#REDIS_LIST} or {@link RedisDataType#REDIS_SORTEDSET} was created then this
   * function will call the necessary calls to create the parameterized queries for those keys.
   *
   * @param event EntryEvent from meta data region
   */
  private void afterKeyCreate(EntryEvent<String, RedisDataType> event) {
    if (event.isOriginRemote()) {
      final String key = (String) event.getKey();
      final RedisDataType value = event.getNewValue();
      if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL
          && value != RedisDataType.REDIS_PROTECTED) {
        try {
          this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key),
              value);
        } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore
        }
      }
    }
  }

  /**
   * When a key is removed then this function will make sure the associated queries with the key are
   * also removed from each vm to avoid unnecessary data retention
   */
  private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) {
    if (event.isOriginRemote()) {
      final String key = (String) event.getKey();
      final RedisDataType value = event.getOldValue();
      if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL
          && value != RedisDataType.REDIS_PROTECTED) {
        ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key);
        Region<?, ?> r = this.regionCache.getRegion(kW);
        if (r != null) {
          this.regionCache.removeRegionReferenceLocally(kW, value);
        }
      }
    }
  }

  private class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> {
    @Override
    public void afterCreate(EntryEvent<String, RedisDataType> event) {
      afterKeyCreate(event);
    }

    @Override
    public void afterDestroy(EntryEvent<String, RedisDataType> event) {
      afterKeyDestroy(event);
    }
  }

  /**
   * Helper method to get GemFire set socket buffer size, possibly a default of 32k
   *
   * @return Buffer size to use for server
   */
  private int getBufferSize() {
    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
    return system.getConfig().getSocketBufferSize();
  }

  /**
   * Shutdown method for {@code GeodeRedisServer}. This closes the {@link Cache}, interrupts all
   * execution and forcefully closes all connections.
   */
  public synchronized void shutdown() {
    if (!shutdown) {
      if (logger.infoEnabled())
        logger.info("GeodeRedisServer shutting down");
      ChannelFuture closeFuture = this.serverChannel.closeFuture();
      Future<?> c = workerGroup.shutdownGracefully();
      Future<?> c2 = bossGroup.shutdownGracefully();
      this.serverChannel.close();
      c.syncUninterruptibly();
      c2.syncUninterruptibly();
      this.regionCache.close();
      if (mainThread != null)
        mainThread.interrupt();
      for (ScheduledFuture<?> f : this.expirationFutures.values())
        f.cancel(true);
      this.expirationFutures.clear();
      this.expirationExecutor.shutdownNow();
      closeFuture.syncUninterruptibly();
      shutdown = true;
    }
  }

  /**
   * Static main method that allows the {@code GeodeRedisServer} to be started from the command
   * line. The supported command line arguments are
   * <p>
   * -port= <br>
   * -bind-address= <br>
   * -log-level=
   *
   * @param args Command line args
   */
  public static void main(String[] args) {
    int port = DEFAULT_REDIS_SERVER_PORT;
    String bindAddress = null;
    String logLevel = null;
    for (String arg : args) {
      if (arg.startsWith("-port"))
        port = getPort(arg);
      else if (arg.startsWith("-bind-address"))
        bindAddress = getBindAddress(arg);
      else if (arg.startsWith("-log-level"))
        logLevel = getLogLevel(arg);
    }
    mainThread = Thread.currentThread();
    GeodeRedisServer server = new GeodeRedisServer(bindAddress, port, logLevel);
    server.start();
    while (true) {
      try {
        Thread.sleep(Long.MAX_VALUE);
      } catch (InterruptedException e1) {
        break;
      } catch (Exception e) {
      }
    }
  }

  /**
   * Helper method to parse the port to a number
   *
   * @param arg String where the argument is
   * @return The port number when the correct syntax was used, otherwise will return
   *         {@link #DEFAULT_REDIS_SERVER_PORT}
   */
  private static int getPort(String arg) {
    int port = DEFAULT_REDIS_SERVER_PORT;
    if (arg != null && arg.length() > 6) {
      if (arg.startsWith("-port")) {
        String p = arg.substring(arg.indexOf('=') + 1);
        p = p.trim();
        try {
          port = Integer.parseInt(p);
        } catch (NumberFormatException e) {
          System.out.println("Unable to parse port, using default port");
        }
      }
    }
    return port;
  }

  /**
   * Helper method to parse bind address
   *
   * @param arg String holding bind address
   * @return Bind address
   */
  private static String getBindAddress(String arg) {
    String address = null;
    if (arg != null && arg.length() > 14) {
      if (arg.startsWith("-bind-address")) {
        String p = arg.substring(arg.indexOf('=') + 1);
        address = p.trim();
      }
    }
    return address;
  }

  /**
   * Helper method to parse log level
   *
   * @param arg String holding log level
   * @return Log level
   */
  private static String getLogLevel(String arg) {
    String logLevel = null;
    if (arg != null && arg.length() > 11) {
      if (arg.startsWith("-log-level")) {
        String p = arg.substring(arg.indexOf('=') + 1);
        logLevel = p.trim();
      }
    }
    return logLevel;
  }
}
